import re
import os
import sys
import ssl
import socket
import hashlib
import traceback
from types import ModuleType
from typing import Generator
from subprocess import Popen
from contextlib import contextmanager

from mechanize import Browser, Request, HTTPError
from mechanize._response import response_seek_wrapper as Response

from calibre import get_proxies  # type: ignore
from calibre.utils.logging import Log  # type: ignore

from ..vendor import socks
from ..vendor.cssselect import GenericTranslator, SelectorError


ns = {'x': 'http://www.w3.org/1999/xhtml'}
is_test = 'unittest' in sys.modules
log = Log(level=Log.DEBUG if os.environ.get('CALIBRE_DEBUG') else Log.INFO)

log.debug('Backup original socket: ', id(socket.socket))
original_socket = socket.socket


def dummy(*args, **kwargs):
    pass


def sep(char='═', count=38):
    return char * count


def css(selector):
    try:
        return GenericTranslator().css_to_xpath(selector, prefix='self::x:')
    except SelectorError:
        return None


def css_to_xpath(selectors):
    patterns = []
    for selector in selectors:
        if rule := css(selector):
            patterns.append(rule)
    return patterns


def create_xpath(selectors):
    selectors = (selectors,) if isinstance(selectors, str) else selectors
    return './/*[%s]' % ' or '.join(css_to_xpath(selectors))


def uid(*args):
    md5 = hashlib.md5()
    for arg in args:
        md5.update(arg if isinstance(arg, bytes) else arg.encode('utf-8'))
    return md5.hexdigest()


def trim(text):
    # Replace \xa0 with whitespace to be compatible with Python 2.x.
    text = re.sub(u'\u00a0|\u3000', ' ', text)
    # Remove the \x07 from the translation generated by some engine.
    text = re.sub(u'\u200b|\ufeff', '', text)
    # Combine multiple white spaces into a single space.
    text = re.sub(r'\s+', ' ', text)
    # Remove all potential non-printable characters.
    text = re.sub(r'(?![\n\r\t])[\x00-\x1f\x7f-\xa0\xad]', '', text)
    return text.strip()


def chunk(items, length=0):
    if length < 1:
        for item in items:
            yield [item]
        return
    item_length = len(items)
    length = item_length if length > item_length else length
    chunk_size = item_length / length
    for i in range(length):
        yield items[int(chunk_size*i):int(chunk_size*(i+1))]


def group(numbers):
    ranges = []
    current_range: list[int] = []
    numbers = sorted(numbers)
    for number in numbers:
        if not current_range:
            current_range = [number, number]
        elif number - current_range[-1] == 1:
            current_range[-1] = number
        else:
            ranges.append(tuple(current_range))
            current_range = [number, number]
    ranges.append(tuple(current_range))
    return ranges


def sorted_mixed_keys(s):
    # https://docs.python.org/3/reference/expressions.html#value-comparisons
    return [int(s) if s.isdigit() else s for s in re.split(r'(\d+)', s)]


def is_str(data):
    return type(data).__name__ in ('str', 'unicode')


def is_proxy_available(host, port, timeout=1):
    try:
        host = host.replace('http://', '')
        socket.create_connection((host, int(port)), timeout).close()
    except Exception:
        return False
    return True


def size_by_unit(number, unit='KB'):
    unit = unit.upper()
    multiple = {'KB': 1, 'MB': 2}
    if unit not in multiple:
        unit = 'KB'
    return round(float(number) / (1000 ** multiple[unit]), 2)


def open_path(path):
    cmd = 'open'
    if sys.platform.startswith('win32'):
        cmd = 'explorer'
    if sys.platform.startswith('linux'):
        cmd = 'xdg-open'
    Popen([cmd, path])


def open_file(path, encoding='utf-8'):
    with open(path, 'r', encoding=encoding, newline=None) as file:
        return file.read()


def traceback_error():
    return traceback.format_exc(chain=False).strip()


def request(
        url, data=None, headers={}, method='GET', timeout=30, proxy_uri=None,
        raw_object=False) -> Response | str | None:
    br = Browser()
    br.set_handle_robots(False)

    # Create a more robust SSL context
    try:
        # Try with proper SSL verification first
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = True
        ssl_context.verify_mode = ssl.CERT_REQUIRED
        br.set_ca_data(context=ssl_context)
    except Exception:
        # Fallback to unverified context if needed
        br.set_ca_data(
            context=ssl._create_unverified_context(cert_reqs=ssl.CERT_NONE))
    # Set up a proxy; use the proxy settings if available, otherwise read from
    # the environment.
    proxies: dict = {}
    if proxy_uri is not None:
        proxies.update(http=proxy_uri, https=proxy_uri)
    else:
        http = get_proxies(False).get('http')
        if http is not None:
            proxies.update(http=http, https=http)
        https = get_proxies(False).get('https')
        if https is not None:
            proxies.update(https=https)
    if len(proxies) > 0:
        br.set_proxies(proxies)
    # Make Mechanize raise detailed information when an HTTP error occurs.
    try:
        _request = Request(
            url, data, headers=headers, timeout=timeout, method=method)
        br.open(_request)
        response: Response | None = br.response()
        if response is None or raw_object:
            return response
        return response.read().decode('utf-8').strip()
    except HTTPError as e:
        raise Exception(traceback_error() + '\n\n' + e.read().decode('utf-8'))


@contextmanager
def socks_proxy(host: str, port: int) -> Generator[ModuleType, None, None]:
    """This is a monkey-patch approach to enforce Mechanize to use a SOCKS5
    proxy. The context manager restores the original socket after it exits.
    """
    # Temporarily remove environment proxies to prevent conflicts with the
    # SOCKS5 proxy, which might otherwise send connections through an HTTP
    # proxy, causing a "General SOCKS server failure" error.
    backup_http = os.environ.pop("http_proxy", None)
    backup_https = os.environ.pop("https_proxy", None)
    if socket.socket is not original_socket:
        log.debug('Socket already patched: ', id(socket.socket))
    else:
        # TODO: There is a bug in asyncio environments where the socket may be
        # patched multiple times due to race conditions.
        log.debug('Patch socket: ', id(socks.socksocket))
        socks.set_default_proxy(socks.SOCKS5, host, int(port), rdns=True)
        socket.socket = socks.socksocket
    try:
        yield socket
    finally:
        if socket.socket is not original_socket:
            log.debug('Restore original socket: ', id(original_socket))
            socket.socket = original_socket
            socks.set_default_proxy(None)
        # Restore the environment proxies if any exist.
        if backup_http is not None:
            os.environ['http_proxy'] = backup_http
        if backup_https is not None:
            os.environ['https_proxy'] = backup_https
